{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.\n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer-parallel-run.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Neural style transfer on video\n",
        "Using modified code from `pytorch`'s neural style [example](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html), we show how to setup a pipeline for doing style transfer on video. The pipeline has following steps:\n",
        "1. Split a video into images\n",
        "2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n",
        "3. Stitch the image back into a video.\n",
        "\n",
        "> **Tip**\n",
        "If your system requires low-latency processing (to process a single document or small set of documents quickly), use [real-time scoring](https://docs.microsoft.com/en-us/azure/machine-learning/v1/how-to-consume-web-service) instead of batch prediction."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Check core SDK version number\n",
        "import azureml.core\n",
        "\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace, Experiment\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print('Workspace name: ' + ws.name, \n",
        "      'Azure region: ' + ws.location, \n",
        "      'Subscription id: ' + ws.subscription_id, \n",
        "      'Resource group: ' + ws.resource_group, sep = '\\n')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "from azureml.core import Datastore, Dataset\n",
        "from azureml.pipeline.core import Pipeline\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "from azureml.data import OutputFileDatasetConfig"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download models"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "# create directory for model\n",
        "model_dir = 'models'\n",
        "if not os.path.isdir(model_dir):\n",
        "    os.mkdir(model_dir)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import urllib.request\n",
        "\n",
        "def download_model(model_name):\n",
        "    # downloaded models from https://pytorch.org/tutorials/advanced/neural_style_tutorial.html are kept here\n",
        "    url = \"https://pipelinedata.blob.core.windows.net/styletransfer/saved_models/\" + model_name\n",
        "    local_path = os.path.join(model_dir, model_name)\n",
        "    urllib.request.urlretrieve(url, local_path)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Register all Models"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.model import Model\n",
        "mosaic_model = None\n",
        "candy_model = None\n",
        "\n",
        "models = Model.list(workspace=ws, tags=['scenario'])\n",
        "for m in models:\n",
        "    print(\"Name:\", m.name,\"\\tVersion:\", m.version, \"\\tDescription:\", m.description, m.tags)\n",
        "    if m.name == 'mosaic' and mosaic_model is None:\n",
        "        mosaic_model = m\n",
        "    elif m.name == 'candy' and candy_model is None:\n",
        "        candy_model = m\n",
        "\n",
        "if mosaic_model is None:\n",
        "    print('Mosaic model does not exist, registering it')\n",
        "    download_model('mosaic.pth')\n",
        "    mosaic_model = Model.register(model_path = os.path.join(model_dir, \"mosaic.pth\"),\n",
        "                       model_name = \"mosaic\",\n",
        "                       tags = {'type': \"mosaic\", 'scenario': \"Style transfer using batch inference\"},\n",
        "                       description = \"Style transfer - Mosaic\",\n",
        "                       workspace = ws)\n",
        "else:\n",
        "    print('Reusing existing mosaic model')\n",
        "    \n",
        "\n",
        "if candy_model is None:\n",
        "    print('Candy model does not exist, registering it')\n",
        "    download_model('candy.pth')\n",
        "    candy_model = Model.register(model_path = os.path.join(model_dir, \"candy.pth\"),\n",
        "                       model_name = \"candy\",\n",
        "                       tags = {'type': \"candy\", 'scenario': \"Style transfer using batch inference\"},\n",
        "                       description = \"Style transfer - Candy\",\n",
        "                       workspace = ws)\n",
        "else:\n",
        "    print('Reusing existing candy model')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Create or use existing compute\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# AmlCompute\n",
        "cpu_cluster_name = \"cpu-cluster\"\n",
        "try:\n",
        "    cpu_cluster = AmlCompute(ws, cpu_cluster_name)\n",
        "    print(\"found existing cluster.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new cluster\")\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_v2\",\n",
        "                                                                    max_nodes = 1)\n",
        "\n",
        "    # create the cluster\n",
        "    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)\n",
        "    cpu_cluster.wait_for_completion(show_output=True)\n",
        "    \n",
        "# AmlCompute\n",
        "gpu_cluster_name = \"gpu-cluster\"\n",
        "try:\n",
        "    gpu_cluster = AmlCompute(ws, gpu_cluster_name)\n",
        "    print(\"found existing cluster.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new cluster\")\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"Standard_NC6s_v3\",\n",
        "                                                                max_nodes = 3)\n",
        "\n",
        "    # create the cluster\n",
        "    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n",
        "    gpu_cluster.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Python Scripts\n",
        "We use an edited version of `neural_style_mpi.py` (original is [here](https://github.com/pytorch/examples/blob/master/fast_neural_style/neural_style/neural_style.py)). Scripts to split and stitch the video are thin wrappers to calls to `ffmpeg`. \n",
        "\n",
        "We install `ffmpeg` through conda dependencies."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "scripts_folder = \"scripts\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "process_video_script_file = \"process_video.py\"\n",
        "\n",
        "# peek at contents\n",
        "with open(os.path.join(scripts_folder, process_video_script_file)) as process_video_file:\n",
        "    print(process_video_file.read())"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "stitch_video_script_file = \"stitch_video.py\"\n",
        "\n",
        "# peek at contents\n",
        "with open(os.path.join(scripts_folder, stitch_video_script_file)) as stitch_video_file:\n",
        "    print(stitch_video_file.read())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The sample video **organutan.mp4** is stored at a publicly shared datastore. We are registering the datastore below. If you want to take a look at the original video, click here. (https://pipelinedata.blob.core.windows.net/sample-videos/orangutan.mp4)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# datastore for input video\n",
        "account_name = \"pipelinedata\"\n",
        "video_ds = Datastore.register_azure_blob_container(ws, \"videos\", \"sample-videos\",\n",
        "                                            account_name=account_name, overwrite=True)\n",
        "\n",
        "# the default blob store attached to a workspace\n",
        "default_datastore = ws.get_default_datastore()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Sample video"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "video_name=os.getenv(\"STYLE_TRANSFER_VIDEO_NAME\", \"orangutan.mp4\") \n",
        "orangutan_video = Dataset.File.from_files((video_ds,video_name))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "cd = CondaDependencies.create(python_version=\"3.8\", conda_packages=['pip==20.2.4'])\n",
        "\n",
        "cd.add_channel(\"conda-forge\")\n",
        "cd.add_conda_package(\"ffmpeg==4.0.2\")\n",
        "\n",
        "# Runconfig\n",
        "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n",
        "amlcompute_run_config.environment.docker.base_image = \"pytorch/pytorch\"\n",
        "amlcompute_run_config.environment.spark.precache_packages = False"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ffmpeg_audio = OutputFileDatasetConfig(name=\"ffmpeg_audio\")\n",
        "processed_images = OutputFileDatasetConfig(name=\"processed_images\")\n",
        "output_video = OutputFileDatasetConfig(name=\"output_video\")\n",
        "\n",
        "ffmpeg_images = OutputFileDatasetConfig(name=\"ffmpeg_images\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Define tweakable parameters to pipeline\n",
        "These parameters can be changed when the pipeline is published and rerun from a REST call.\n",
        "As part of ParallelRunStep following 2 pipeline parameters will be created which can be used to override values.\n",
        "    node_count\n",
        "    process_count_per_node"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core.graph import PipelineParameter\n",
        "# create a parameter for style (one of \"candy\", \"mosaic\") to transfer the images to\n",
        "style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")\n",
        "# create a parameter for the number of nodes to use in step no. 2 (style transfer)\n",
        "nodecount_param = PipelineParameter(name=\"nodecount\", default_value=2)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "split_video_step = PythonScriptStep(\n",
        "    name=\"split video\",\n",
        "    script_name=\"process_video.py\",\n",
        "    arguments=[\"--input_video\", orangutan_video.as_mount(),\n",
        "               \"--output_audio\", ffmpeg_audio,\n",
        "               \"--output_images\", ffmpeg_images],\n",
        "    compute_target=cpu_cluster,\n",
        "    runconfig=amlcompute_run_config,\n",
        "    source_directory=scripts_folder\n",
        ")\n",
        "\n",
        "stitch_video_step = PythonScriptStep(\n",
        "    name=\"stitch\",\n",
        "    script_name=\"stitch_video.py\",\n",
        "    arguments=[\"--images_dir\", processed_images.as_input(), \n",
        "               \"--input_audio\", ffmpeg_audio.as_input(), \n",
        "               \"--output_dir\", output_video],\n",
        "    compute_target=cpu_cluster,\n",
        "    runconfig=amlcompute_run_config,\n",
        "    source_directory=scripts_folder\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Create environment, parallel step run config and parallel run step"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Environment\n",
        "from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n",
        "\n",
        "parallel_cd = CondaDependencies.create(python_version=\"3.8\", conda_packages=['pip==20.2.4', 'numpy==1.19'])\n",
        "\n",
        "parallel_cd.add_channel(\"pytorch\")\n",
        "parallel_cd.add_conda_package(\"pytorch\")\n",
        "parallel_cd.add_conda_package(\"torchvision\")\n",
        "parallel_cd.add_conda_package(\"pillow<7\") # needed for torchvision==0.4.0\n",
        "\n",
        "styleenvironment = Environment(name=\"styleenvironment\")\n",
        "styleenvironment.python.conda_dependencies=parallel_cd\n",
        "styleenvironment.docker.base_image = DEFAULT_GPU_IMAGE"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineParameter\n",
        "from azureml.pipeline.steps import ParallelRunConfig\n",
        "\n",
        "parallel_run_config = ParallelRunConfig(\n",
        "    environment=styleenvironment,\n",
        "    entry_script='transform.py',\n",
        "    output_action='summary_only',\n",
        "    mini_batch_size=\"1\",\n",
        "    error_threshold=1,\n",
        "    source_directory=scripts_folder,\n",
        "    compute_target=gpu_cluster, \n",
        "    node_count=nodecount_param,\n",
        "    process_count_per_node=2\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.steps import ParallelRunStep\n",
        "from datetime import datetime\n",
        "\n",
        "parallel_step_name = 'styletransfer-' + datetime.now().strftime('%Y%m%d%H%M')\n",
        "\n",
        "distributed_style_transfer_step = ParallelRunStep(\n",
        "    name=parallel_step_name,\n",
        "    inputs=[ffmpeg_images], # Input file share/blob container/file dataset\n",
        "    output=processed_images,  # Output file share/blob container\n",
        "    arguments=[\"--style\", style_param],\n",
        "    parallel_run_config=parallel_run_config,\n",
        "    allow_reuse=False #[optional - default value True]\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Run the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = Pipeline(workspace=ws, steps=[stitch_video_step])\n",
        "\n",
        "pipeline.validate()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# submit the pipeline and provide values for the PipelineParameters used in the pipeline\n",
        "pipeline_run = Experiment(ws, 'styletransfer_parallel_mosaic').submit(pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Monitor pipeline run\n",
        "\n",
        "The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object.\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# This will output information of the pipeline run, including the link to the details page of portal.\n",
        "pipeline_run"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Optional: View detailed logs (streaming) "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Wait the run for completion and show output log to console\n",
        "pipeline_run.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download output video"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Downloads the video in `output_video` folder"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def download_video(run, target_dir=None):\n",
        "    stitch_run = run.find_step_run(stitch_video_step.name)[0]\n",
        "    port_data = stitch_run.get_details()['outputDatasets'][0]['dataset']\n",
        "    port_data.download(target_dir)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion()\n",
        "download_video(pipeline_run, \"output_video_mosaic\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Publish pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_name = \"style-transfer-batch-inference\"\n",
        "print(pipeline_name)\n",
        "\n",
        "published_pipeline = pipeline.publish(\n",
        "    name=pipeline_name, \n",
        "    description=pipeline_name)\n",
        "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Get published pipeline\n",
        "This is another way to get the published pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PublishedPipeline\n",
        "\n",
        "# You could retrieve all pipelines that are published, or \n",
        "# just get the published pipeline object that you have the ID for.\n",
        "\n",
        "# Get all published pipeline objects in the workspace\n",
        "all_pub_pipelines = PublishedPipeline.list(ws)\n",
        "\n",
        "# We will iterate through the list of published pipelines and \n",
        "# use the last ID in the list for Schelue operations: \n",
        "print(\"Published pipelines found in the workspace:\")\n",
        "for pub_pipeline in all_pub_pipelines:\n",
        "    print(\"Name:\", pub_pipeline.name,\"\\tDescription:\", pub_pipeline.description, \"\\tId:\", pub_pipeline.id, \"\\tStatus:\", pub_pipeline.status)\n",
        "    if(pub_pipeline.name == pipeline_name):\n",
        "        published_pipeline = pub_pipeline\n",
        "\n",
        "print(\"Published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Run pipeline through REST calls for other styles\n",
        "\n",
        "# Get AAD token"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Get endpoint URL"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "rest_endpoint = published_pipeline.endpoint\n",
        "print(\"Pipeline REST endpoing: {}\".format(rest_endpoint))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Send request and monitor"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "experiment_name = 'styletransfer_parallel_candy'\n",
        "response = requests.post(rest_endpoint, \n",
        "                         headers=aad_token,\n",
        "                         json={\"ExperimentName\": experiment_name,\n",
        "                               \"ParameterAssignments\": {\"style\": \"candy\", \"NodeCount\": 3}})\n",
        "\n",
        "run_id = response.json()[\"Id\"]\n",
        "\n",
        "from azureml.pipeline.core.run import PipelineRun\n",
        "published_pipeline_run_candy = PipelineRun(ws.experiments[experiment_name], run_id)\n",
        "\n",
        "# Show detail information of run\n",
        "published_pipeline_run_candy"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Download output from re-run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_candy.wait_for_completion()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "download_video(published_pipeline_run_candy, target_dir=\"output_video_candy\")"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil joringer asraniwa pansav tracych"
      }
    ],
    "category": "Other notebooks",
    "compute": [
      "AML Compute"
    ],
    "datasets": [],
    "deployment": [
      "None"
    ],
    "exclude_from_index": true,
    "framework": [
      "None"
    ],
    "friendly_name": "Style transfer using ParallelRunStep",
    "index_order": 1,
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    },
    "tags": [
      "Batch Inferencing",
      "Pipeline"
    ],
    "task": "Style transfer"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}